Skip to content

工作原理

发送端

  1. 发送端插入消息(生成序列号,记录事务号),消息状态为操作前
  2. 业务操作,若成功,改消息状态为已操作,若失败,删除消息
  3. 通知服务发消息

服务端

  1. 接收发消息通知
  2. 配置队列订阅关系
  3. 发送消息到队列,若成功,改消息状态为已发送,若失败无视
  4. 接收消息,写入推送表,如果已经写入,则不写入
  5. 消息ack
  6. 查询同一个订阅者下,相同key的序号更小的消息是否都已成功推送,如果还有没推送的则不推送,如果都已推送,则推送本条消息,这样可以保证同一个订阅者下相同key的消息保序
  7. 推送, 两个条件下认为成功:1.返回 http code 为200,且json code 为0,说明接口成功执行,2. 返回http code 为404说明接口不存在,这种情况只应该存在于灰度状态,说明当前灰度不应该推送消息。除这两个条件,其他都认为失败
  8. 修改推送状态为成功推送或者推送失败

校验补漏部分

  1. 未操作状态的消息,校验实际业务已操作的,补发,否则删除
  2. 已操作状态的消息,补发
  3. 已发送状态的消息,校验推送表是否已完整收到,若没有,补发,若都已收到,标记已接收
  4. 已接收和推送失败状态的推送,补退

注:队列关系配置逻辑

  1. 发送消息到队列前服务端会确保队列内的订阅关系配置正确
  2. exchange 的名字取 topic 名
  3. 队列名取订阅名.序号,序号是从0开始的数字,订阅的 tag 是 topic.tag.序号
  4. 发送消息的标签格式是:topic.消息本身带的标签.序号
  5. 消息标签中的序号由发送端计算得到,假如队列有4个,那么根据key值对4取余得到序号
  6. 服务端发送消息前会根据队列的堆积情况处理队列:1. 如果当前队列平均堆积超过一个阈值,增加订阅队列和接收消息线程,根据新的队列数计算消息标签中的序号值,2. 如果当前队列中平均堆积小于1,减少一个队列计数,根据新的队列计数计算序号值,3. 假如队列中有队列超过1min没有收到新的消息,删除队列,但会至少保证有一个订阅队列。